<?php
namespace app\common\job;

use app\common\model\RoymqMessageModel;
use think\cache\driver\Redis;
use think\facade\Log;
use think\queue\Job;

class MessageJob
{
    /**
     * 发送消息
     * @author 贺强
     * @time   2022/6/24 14:49
     * @param Job   $job  当前任务
     * @param array $data 消息参数
     * @return bool
     */
    public function send_msg(Job $job, array $data) : bool
    {
        if (empty($data['MsgBody']) || empty($data['QueueName'])) {
            $job->delete();
            return false;
        }
        $data_new = [];
        foreach ($data as $k => $v) {
            $data_new[unhump($k)] = $v;
        }
        $res = RoymqMessageModel::add($data_new);
        if ($res) {
            $queue = $data['QueueName'];
            if (!empty($data['TopicName'])) {
                $queue = $queue . '@' . $data['TopicName'];
            }
            $queue = md5($queue);
            $redis = new Redis();
            if ($redis->has($queue)) {
                $value = $redis->get($queue);
                $value = json_decode($value, true);
                $value[] = $data_new;
                $redis->set($queue, json_encode($value));
            } else {
                $redis->set($queue, json_encode([$data_new]));
            }
            $job->delete();
            return true;
        } elseif ($job->attempts() > 10) {
            $job->delete();
            return false;
        } else {
            $job->release(3);
            return false;
        }
    }

    /**
     * 接收消息
     * @author 贺强
     * @time   2022/6/28 16:56
     * @param Job   $job  当前任务
     * @param array $data 参数
     * @return bool
     */
    public function receive(Job $job, array $data) : bool
    {
        if (empty($data['id']) || empty($data['receive_time'])) {
            $job->delete();
            return false;
        }
        try {
            $res = RoymqMessageModel::modify($data, ['id' => $data['id']]);
            if ($res) {
                $job->delete();
                return true;
            } elseif ($job->attempts() > 10) {
                $job->delete();
                $error = ['jobName' => $job->getName(), 'data' => $data];
                throw new \Exception('执行次数超限:' . json_encode($error));
            } else {
                $job->release(3);
                return false;
            }
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            $job->delete();
            return false;
        }
    }
}
